package com.hc.server.user.cosumer;

import com.hc.server.common.constant.MQConstant;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;


/**
 * 手动消费历史数据
 * Created by wdj on 2024/10/20
 */
public class ManualConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQConstant.LOGIN_MSG_GROUP_UID);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("login_send_msg_uid", "*");

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                System.out.println("消费消息: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}
